package com.example.mq.mqclient;

import com.example.mq.common.*;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.annotations.Select;

import java.io.*;
import java.net.Socket;
import java.net.SocketException;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Slf4j
public class Connection {
    private Socket socket = null;
    //使用哈希表 把若干个channel对象组织起来
    private ConcurrentHashMap<String, Channel> channelMap = new ConcurrentHashMap<>();
    private InputStream inputStream;
    private OutputStream outputStream;
    private DataOutputStream dataOutputStream;
    private DataInputStream dataInputStream;

    private ExecutorService callbackPool = null;

    public Connection(String host, int port) throws IOException {
        socket = new Socket(host, port);
        inputStream = socket.getInputStream();
        outputStream = socket.getOutputStream();
        dataInputStream = new DataInputStream(inputStream);
        dataOutputStream = new DataOutputStream(outputStream);

        callbackPool = Executors.newFixedThreadPool(4);

        //创建一个扫描线程 由这个线程负责不停的对socket中读取响应数据 把这个响应数据再交给对应的channel负责处理
        Thread t = new Thread(() -> {
            try {
                while (!socket.isClosed()) {
                    Response response = readResponse();
                    dispatchResponse(response);
                }
            } catch (SocketException e) {
                //连接正常断开 忽略这个异常
                log.info("连接正常断开");
            } catch (IOException | ClassNotFoundException | MqException e) {
                log.info("连接异常断开");
                e.printStackTrace();
            }
        });
        t.start();
    }

    public void close() {
        //关闭Connection 释放上述资源
        try {
            callbackPool.shutdownNow();
            channelMap.clear();
            inputStream.close();
            outputStream.close();
            socket.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    //使用这个方法 分别来处理
    //这个消息是 针对控制请求的响应 还是服务器推送的消息
    private void dispatchResponse(Response response) throws IOException, ClassNotFoundException, MqException {
        if (response.getType() == 0xc) {
            //服务器推送来的消息数据
            SubScribeReturns subScribeReturns = (SubScribeReturns) BinaryTool.fromBytes(response.getPayload());
            //根据channelId 找到对应的channel对象
            Channel channel = channelMap.get(subScribeReturns.getChannelId());
            if (channel == null) {
                throw new MqException("[Connect] 该消息对应的channel 在客户端中不存在 channelId = " + channel.getChannelId());
            }
            //执行该channel对象内部的回调
            callbackPool.submit(() -> {
                try {
                    channel.getConsumer().handleDelivery(subScribeReturns.getConsumerTag(), subScribeReturns.getBasicProperties(),
                            subScribeReturns.getBody());
                } catch (MqException | IOException e) {
                    e.printStackTrace();
                }
            });
        } else {
            //当前响应是针对刚才的控制请求的响应
            BasicReturns basicReturns = (BasicReturns) BinaryTool.fromBytes(response.getPayload());
            //把这个结果放在对应的channel的hash表中
            Channel channel = channelMap.get(basicReturns.getChannelId());
            if (channel == null) {
                throw new MqException("[Connection] 该队列对应的channel在客户端中不存在");
            }
            channel.putReturns(basicReturns);
        }
    }

    //发送请求
    public void writeRequest(Request request) throws IOException {
        dataOutputStream.writeInt(request.getType());
        dataOutputStream.writeInt(request.getLength());
        dataOutputStream.write(request.getPayload());
        dataOutputStream.flush();
        log.info("发送请求 type:{} ,length:{}", request.getType(), request.getLength());
    }


    //读取响应
    public Response readResponse() throws IOException {
        Response response = new Response();
        response.setType(dataInputStream.readInt());
        response.setLength(dataInputStream.readInt());
        byte[] payload = new byte[response.getLength()];
        int n = dataInputStream.read(payload);
        if (n != response.getLength()) {
            throw new IOException("读取的响应数据不完整");
        }
        response.setPayload(payload);
        log.info("收到响应 type:{}, length:{}", response.getType(), response.getLength());
        return response;
    }

    //创建Channel
    public Channel createChannel() throws IOException {
        String channelId = "C-" + UUID.randomUUID();
        Channel channel = new Channel(channelId, this);
        //把这个 Channel对象 放到Connection 管理channel 的哈希表中
        channelMap.put(channelId, channel);
        //同时也需要把创建channel这个消息告诉服务器
        boolean ok = channel.createChannel();
        if (!ok) {
            //整个这次创建channel操作不顺利
            //把哈希表中的键值对删除
            channelMap.remove(channelId);
            return null;
        }
        return channel;
    }

}
